package com.atuguigu.flink.Day01.Singlesensor;

import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Calendar;
import java.util.Random;

//此类为数据源的模拟,自定义数据源，对于并行，所以需要继承RichParallelSourceFunction
public class SensorSource extends RichParallelSourceFunction<SendsorReading> {
    private boolean running=true;

    //run函数来产生数据
    @Override
    public void run(SourceContext<SendsorReading> sourceContext) throws Exception {
        //实例化一个随机数发生器
        Random random = new Random();
        //设置10个传感器ID
        String[] sensorIds = new String[10];
        //设置温度
        Double[] curFTemp = new Double[10];
        for(int i = 0; i < 10; i++){
           sensorIds[i]="sensor_"+(i+1);//编号
            curFTemp[i]=65+(random.nextGaussian()*20);//每个传感器生成的随机温度
        }


        //在循环体里面发送数据
        while (running){
            //获取时间戳
            long curTime = Calendar.getInstance().getTimeInMillis();
            for (int i = 0; i < 10; i++){
                //产生随机数温度更方便些
                curFTemp[i]+=random.nextGaussian()*0.5;
                sourceContext.collect(new SendsorReading(sensorIds[i],curFTemp[i],curTime));
            }
        }


    }

    @Override
    public void cancel() {
        running=false;
    }
}
